/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.iotdb.db.wal.checkpoint;

import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.wal.io.CheckpointWriter;
import org.apache.iotdb.db.wal.io.ILogWriter;
import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** This class is used to manage checkpoints of one wal node */
public class CheckpointManager implements AutoCloseable {
  private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

  /** WALNode identifier of this checkpoint manager */
  protected final String identifier;
  /** directory to store .checkpoint file */
  protected final String logDirectory;
  /**
   * protect concurrent safety of checkpoint info, including memTableId2Info, cachedByteBuffer,
   * currentLogVersion and currentLogWriter
   */
  private final Lock infoLock = new ReentrantLock();
  // region these variables should be protected by infoLock
  /** memTable id -> memTable info */
  private final Map<Long, MemTableInfo> memTableId2Info = new HashMap<>();
  /** cache the biggest byte buffer to serialize checkpoint */
  private volatile ByteBuffer cachedByteBuffer;
  /** max memTable id */
  private long maxMemTableId = 0;
  /** current checkpoint file version id, only updated by fsyncAndDeleteThread */
  private int currentCheckPointFileVersion = 0;
  /** current checkpoint file log writer, only updated by fsyncAndDeleteThread */
  private ILogWriter currentLogWriter;
  // endregion

  public CheckpointManager(String identifier, String logDirectory) throws FileNotFoundException {
    this.identifier = identifier;
    this.logDirectory = logDirectory;
    File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
    if (!logDirFile.exists() && logDirFile.mkdirs()) {
      logger.info("create folder {} for wal buffer-{}.", logDirectory, identifier);
    }
    currentLogWriter =
        new CheckpointWriter(
            SystemFileFactory.INSTANCE.getFile(
                logDirectory, CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion)));
    logHeader();
  }

  private void logHeader() {
    infoLock.lock();
    try {
      // log max memTable id
      ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES);
      tmpBuffer.putLong(maxMemTableId);
      try {
        currentLogWriter.write(tmpBuffer);
      } catch (IOException e) {
        logger.error("Fail to log max memTable id: {}", maxMemTableId, e);
      }
      // log global memTables' info
      makeGlobalInfoCP();
    } finally {
      infoLock.unlock();
    }
  }

  /**
   * make checkpoint for global memTables' info, this checkpoint only exists in the beginning of
   * each checkpoint file
   */
  private void makeGlobalInfoCP() {
    Checkpoint checkpoint =
        new Checkpoint(
            CheckpointType.GLOBAL_MEMORY_TABLE_INFO, new ArrayList<>(memTableId2Info.values()));
    logByCachedByteBuffer(checkpoint);
  }

  /** make checkpoint for create memTable info */
  public void makeCreateMemTableCP(MemTableInfo memTableInfo) {
    infoLock.lock();
    try {
      maxMemTableId = Math.max(maxMemTableId, memTableInfo.getMemTableId());
      memTableId2Info.put(memTableInfo.getMemTableId(), memTableInfo);
      Checkpoint checkpoint =
          new Checkpoint(
              CheckpointType.CREATE_MEMORY_TABLE, Collections.singletonList(memTableInfo));
      logByCachedByteBuffer(checkpoint);
    } finally {
      infoLock.unlock();
    }
  }

  /** make checkpoint for flush memTable info */
  public void makeFlushMemTableCP(long memTableId) {
    infoLock.lock();
    try {
      MemTableInfo memTableInfo = memTableId2Info.remove(memTableId);
      if (memTableInfo == null) {
        return;
      }
      Checkpoint checkpoint =
          new Checkpoint(
              CheckpointType.FLUSH_MEMORY_TABLE, Collections.singletonList(memTableInfo));
      logByCachedByteBuffer(checkpoint);
    } finally {
      infoLock.unlock();
    }
  }

  private void logByCachedByteBuffer(Checkpoint checkpoint) {
    // make sure cached ByteBuffer has enough capacity
    int estimateSize = checkpoint.serializedSize();
    if (cachedByteBuffer == null || estimateSize > cachedByteBuffer.capacity()) {
      cachedByteBuffer = ByteBuffer.allocate(estimateSize);
    }
    checkpoint.serialize(cachedByteBuffer);

    try {
      currentLogWriter.write(cachedByteBuffer);
    } catch (IOException e) {
      logger.error("Fail to make checkpoint: {}", checkpoint, e);
    } finally {
      cachedByteBuffer.clear();
    }

    fsyncCheckpointFile();
  }

  // region Task to fsync checkpoint file
  /** Fsync checkpoints to the disk */
  private void fsyncCheckpointFile() {
    infoLock.lock();
    try {
      try {
        currentLogWriter.force();
      } catch (IOException e) {
        logger.error(
            "Fail to fsync wal node-{}'s checkpoint writer, change system mode to read-only.",
            identifier,
            e);
        config.setReadOnly(true);
      }

      try {
        if (tryRollingLogWriter()) {
          // first log max memTable id and global memTables' info, then delete old checkpoint file
          logHeader();
          currentLogWriter.force();
          File oldFile =
              SystemFileFactory.INSTANCE.getFile(
                  logDirectory,
                  CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion - 1));
          oldFile.delete();
        }
      } catch (IOException e) {
        logger.error(
            "Fail to roll wal node-{}'s checkpoint writer, change system mode to read-only.",
            identifier,
            e);
        config.setReadOnly(true);
      }
    } finally {
      infoLock.unlock();
    }
  }

  private boolean tryRollingLogWriter() throws IOException {
    if (currentLogWriter.size() < config.getCheckpointFileSizeThresholdInByte()) {
      return false;
    }
    currentLogWriter.close();
    currentCheckPointFileVersion++;
    File nextLogFile =
        SystemFileFactory.INSTANCE.getFile(
            logDirectory, CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion));
    currentLogWriter = new CheckpointWriter(nextLogFile);
    return true;
  }
  // endregion

  /** Get MemTableInfo of oldest MemTable, whose first version id is smallest */
  public MemTableInfo getOldestMemTableInfo() {
    // find oldest memTable
    List<MemTableInfo> memTableInfos;
    infoLock.lock();
    try {
      memTableInfos = new ArrayList<>(memTableId2Info.values());
    } finally {
      infoLock.unlock();
    }
    if (memTableInfos.isEmpty()) {
      return null;
    }
    MemTableInfo oldestMemTableInfo = memTableInfos.get(0);
    for (MemTableInfo memTableInfo : memTableInfos) {
      if (oldestMemTableInfo.getFirstFileVersionId() > memTableInfo.getFirstFileVersionId()) {
        oldestMemTableInfo = memTableInfo;
      }
    }
    return oldestMemTableInfo;
  }

  /**
   * Get version id of first valid .wal file
   *
   * @return Return {@link Long#MIN_VALUE} if no file is valid
   */
  public long getFirstValidWALVersionId() {
    List<MemTableInfo> memTableInfos;
    infoLock.lock();
    try {
      memTableInfos = new ArrayList<>(memTableId2Info.values());
    } finally {
      infoLock.unlock();
    }
    long firstValidVersionId = memTableInfos.isEmpty() ? Long.MIN_VALUE : Long.MAX_VALUE;
    for (MemTableInfo memTableInfo : memTableInfos) {
      firstValidVersionId = Math.min(firstValidVersionId, memTableInfo.getFirstFileVersionId());
    }
    return firstValidVersionId;
  }

  /** Get total cost of active memTables */
  public long getTotalCostOfActiveMemTables() {
    long totalCost = 0;

    if (!config.isEnableMemControl()) {
      infoLock.lock();
      try {
        totalCost = memTableId2Info.size();
      } finally {
        infoLock.unlock();
      }
    } else {
      List<MemTableInfo> memTableInfos;
      infoLock.lock();
      try {
        memTableInfos = new ArrayList<>(memTableId2Info.values());
      } finally {
        infoLock.unlock();
      }
      for (MemTableInfo memTableInfo : memTableInfos) {
        totalCost += memTableInfo.getMemTable().getTVListsRamCost();
      }
    }

    return totalCost;
  }

  @Override
  public void close() {
    infoLock.lock();
    try {
      if (currentLogWriter != null) {
        try {
          currentLogWriter.close();
        } catch (IOException e) {
          logger.error("Fail to close wal node-{}'s checkpoint writer.", identifier, e);
        }
      }
    } finally {
      infoLock.unlock();
    }
  }
}
